AWS IoTを使って増えたRaspberry Piを手軽にシャットダウン Python編

AWS IoTを使って増えたRaspberry Piを手軽にシャットダウン Python編

気がつくと身の回りにRaspberry Piが増えていませんか?増えたRaspberry Piを手軽にシャットダウンできる方法をツールを使わずAWS IoTのmqttのsubscribe機能を使う方法をPythonで実装してみました。
Clock Icon2019.09.30

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちはCX事業本部のさかじです。 気がつくと身の回りにRaspberry Piが増えていませんか?増えたRaspberry Piを手軽にシャットダウンできる方法をツールを使わずAWS IoTのmqttのsubscribe機能を使う方法をPythonで実装してみました。 Node.js編

環境

前提条件

Raspberry PiからAWS IoTへデータをmqttを使用して送信できている環境を使用してください。本ソースコードはエラー等考慮した作りになっていませんのでご注意ください。

Raspberry pi側ソースコード

app.py

#!/usr/bin/env python3
import time
import json
import os

class IotMqttClient:

# AWS IOTと接続するときの設定
###############
### 要確認 ###
###############

ROOT_CA_PATH = "./cert/root-CA.crt"
CERTIFICAT_PATH = "./cert/cert.pem"
PRIVATE_KEY_PATH = "./cert/private.key"
IOT_HOST = "xxxxxxxxx.iot.ap-northeast-1.amazonaws.com"
IOT_PORT = 8883
IOT_CLIENT_ID = "test_client"
__my_iot_mqtt_client = None
__is_connected = False

def __init__(self):
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient

# クライアントの設定
self.__my_iot_mqtt_client = AWSIoTMQTTClient(self.IOT_CLIENT_ID)
self.__my_iot_mqtt_client.configureEndpoint(self.IOT_HOST, self.IOT_PORT)
self.__my_iot_mqtt_client.configureCredentials(self.ROOT_CA_PATH, self.PRIVATE_KEY_PATH, self.CERTIFICAT_PATH)

# 接続情報の設定
self.__my_iot_mqtt_client.configureAutoReconnectBackoffTime(1, 32, 20)
self.__my_iot_mqtt_client.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
self.__my_iot_mqtt_client.configureDrainingFrequency(2) # Draining: 2 Hz
self.__my_iot_mqtt_client.configureConnectDisconnectTimeout(10) # 10 sec
self.__my_iot_mqtt_client.configureMQTTOperationTimeout(5) # 5 sec

def on_offline(): self.__is_connected = False
def on_online(): self.__is_connected = True

self.__my_iot_mqtt_client.onOffline = on_offline
self.__my_iot_mqtt_client.onOnline = on_online

# 接続開始
self.__my_iot_mqtt_client.connect()

# サブスクライブする設定
self.__my_iot_mqtt_client.subscribe('mytopic/halt', 1, sub_callback_halt)

def publish(self, IOT_TOPIC, unpacked_data):
if not self.__is_connected:
self.__my_iot_mqtt_client.connect()
self.__my_iot_mqtt_client.publishAsync(IOT_TOPIC, str(unpacked_data), 1, ackCallback=None)

def sub_callback_halt(client, userdata, message):
print(message.payload)
res = os.system('sudo halt')

iot_mqtt_client = IotMqttClient()

while True:
pass

Raspberry Piのアプリケーション起動

$ ./app.py

AWS IoT マネジメントコンソール

"AWS IoT" - "テスト" - "発行"で下記のように入力して"トピックに発行"をクリックします。 しばらくするとRaspberry Piが終了して電源切れる状態になります。

参考サイト

最後に

前回と違い今回はトピック名を直接"halt"コマンドにしてみました。乱暴なやり方なのでもうちょっと良い方法があるかもしれませんが、簡単にシャットダウンさせるには使えるのでは無いかと思っています。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.